package cn.gupao.jobs;

import cn.gupao.pojo.LogBean;
import cn.gupao.udfs.JsonToBeanFunction;
import cn.gupao.udfs.RulesMatchFunction;
import cn.gupao.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/**
 * 先写一个flink的job，规则先写死在程序中
 * <p>
 * 1.首先判断当前的eventID 为 E，并且对应的properties中 p2 = v1
 * 2.如果满足上一步的规则，再查询Hbase，匹配画像（画像条件 {'p8':'v2','p10':'v2'}）
 * 3.如果满足上一步的画像，再查询ClickHouse，最近3天呢，A事件同时属性为{'p1':'v1'}有2次
 * 如果都满足，输出数据000001匹配上了规则
 */
public class Demo1 {

    public static void main(String[] args) throws Exception {

        DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args[0], SimpleStringSchema.class);

        //数据转换清洗
        SingleOutputStreamOperator<LogBean> beanStream = kafkaStream.process(new JsonToBeanFunction());

        //匹配规则
        KeyedStream<LogBean, String> keyedStream = beanStream.keyBy(LogBean::getDeviceId);
        SingleOutputStreamOperator<String> res = keyedStream.process(new RulesMatchFunction());

        res.print();

        FlinkUtils.env.execute();


    }

}
